Spark Context

SparkContext is the entry gate of Apache Spark functionality. The most important step of any Spark driver application is to generate SparkContext. It allows your Spark Application to access Spark Cluster with the help of Resource Manager (YARN/Mesos). To create SparkContext, first SparkConf should be made. The SparkConf has a configuration parameter that our Spark driver application will pass to SparkContext.

What is Spark Context in Apache Spark ?

SparkContext is the entry point of Spark functionality. The most important step of any Spark driver application is to generate SparkContext. It allows your Spark Application to access Spark Cluster with the help of Resource Manager.

How to Create SparkContext Class?
If you want to create SparkContext, first SparkConf should be made. The SparkConf has a configuration parameter that our Spark driver application will pass to SparkContext. Some of these parameter defines properties of Spark driver application. While some are used by Spark to allocate resources on the cluster, like the number, memory size, and cores used by executor running on the worker nodes.
In short, it guides how to access the Spark cluster. After the creation of a SparkContext object, we can invoke functions such as textFile, sequenceFile, parallelize etc. The different contexts in which it can run are local, yarn-client, Mesos URL and Spark URL. Once the SparkContext is created, it can be used to create RDDs, broadcast variable, and accumulator, ingress Spark service and run jobs. All these things can be carried out until SparkContext is stopped. 

pyspark

import pyspark

from pyspark import SparkContext 

sc =SparkContext()

SqlConetxt

from pyspark.sql import Row

from pyspark.sql import SQLContext

sqlContext = SQLContext(sc)

Scala Spark

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
val conf = new SparkConf().setAppName(appName).setMaster(master)
sc=new SparkContext(conf)

Stopping SparkContext
Only one SparkContext may be active per JVM. You must stop the active it before creating a new one as below:
stop(): Unit
It will display the following message:
INFO SparkContext: Successfully stopped SparkContext

Spark Scala Word Count Example
Let’s see how to create SparkContext using SparkConf with the help of Spark-Scala word count example-
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf

    object Wordcount {
    def main(args: Array[String]) {

    val conf = new SparkConf().setAppName("WordCount")
    val sc = new SparkContext(conf)
    if (args.length < 2) {
    println("Usage: ScalaWordCount <input> <output>")
    System.exit(1)
    }

val rawData = sc.textFile(args(0))
val words = rawData.flatMap(line => line.split(" "))

val wordCount = words.map(word => (word, 1)).reduceByKey(_ + _)

wordCount.saveAsTextFile(args(1))

sc.stop
    }
}

Functions of SparkContext in Apache Spark

  • Get the current status of Application
  • Set the configuration
  • Cancel a Job
  • Access various Services.
  • Closure Cleaning in spark
  • Programmable Dynamic Allocation
  • Register Spark Lister
  • Unpersist RDDs
  • Cancel a Stage
  • Access persistent RDD   
Important Functions of SparkContext in Apache Spark

To get the current status of Spark Application
SpkEnv – It is a runtime environment with Spark’s public services. It interacts with each other to establish a distributed computing platform for Spark Application. A SparkEnv object that holds the required runtime services for running Spark application with the different environment for the driver and executor represents the Spark runtime environment.

SparkConf – The Spark Properties handles maximum applications settings and are configured separately for each application. We can also easily set these properties on a SparkConf. Some common properties like master URL and application name, as well as an arbitrary key-value pair, configured through the set() method.

Deployment environment (as master URL) – Spark deployment environment are of two types namely local and clustered. Local mode is non-distributed single-JVM deployment mode. All the execution components – driver, executor, LocalSchedulerBackend, and master are present in same single JVM. Hence, the only mode where drivers are useful for execution is the local mode. For testing, debugging or demonstration purpose, the local mode is suitable because it requires no earlier setup to launch spark application. While in clustered mode, the Spark runs in distributive mode. Learn Spark Cluster Manager in detail.

To set the configuration 

Master URL – The master method returns back the current value of spark.master which is deployment environment in use.

Local properties-Creating Logical Job Groups – The reason of local properties concept is to form logical groups of jobs by means of properties that create the separate job launched from different threads belong to a single logic group. We can set a local property which will affect Spark jobs submitted from a thread, such as the Spark fair scheduler pool.

Default Logging level – It lets you set the root login level in a Spark application, for example, Spark Shell.

To Access various services

It also helps in accessing services like TaskScheduler, LiveListenBus, BlockManager, SchedulerBackend, ShuffelManager and the optional ContextCleaner.
 

To Cancel a job

cancleJob simply requests DAGScheduler to drop a Spark job.
Learn about Spark DAG(Directed Acyclic Graph) in detail.


To Cancel a stage
cancleStage simply requests DAGScheduler to drop a Spark stage.


For Closure cleaning in Spark

Spark cleanups the closure every time an Action occurs, i.e. the body of Action before it is serialized and sent over the wire to execute. The clean method in SparkContext does this. This, in turn, calls ClosureClean.clean method. It not only cleans the closure but also referenced closure is clean transitively. It assumes serializable until it does not explicitly reference unserializable objects.
 

To Register Spark listener

We can register a custom SparkListenerInterface with the help of addSparkListener method. We can also register custom listeners using the spark.extraListeners setting.

Programmable Dynamic allocation

It also provides the following method as the developer API for dynamic allocation of executors: requestExecutors, killExecutors, requestTotalExecutors, getExecutorIds.

To access persistent RDD

getPersistentRDDs gives the collection of RDDs that have marked themselves as persistent via cache.

To unpersist RDDs

From the master’s Block Manager and the internal persistentRdds mapping, the unpersist removes the RDD.

No comments:

Post a Comment